import lombok.extern.slf4j.Slf4j;

import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.*;

/**
 * 循环反复任务池
 * 会有线程查看线程任务执行时间 不允许有执行时间超过两秒的线程
 * 最小执行间隔时间为3秒 因为 检查任务线程的频率为2s
 *
 * @author: xiuwei
 * @version:
 */
@Slf4j
public class RecurringTaskContainer {

  /**
   * 核心数量为10 排序数量为100 默认排序规则  多余线程存活时间为60秒的 定长线程
   * 用于执行定时任务
   */
  private static final ExecutorService calculatorThread = new ThreadPoolExecutor(10, 200,
    60L, TimeUnit.SECONDS, new SynchronousQueue<>());

  /**
   * 核心运行数量为4的定时任务池 用于拉起定时任务
   */
  private static final ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(4);

  /**
   * 定时任务对象  以时间间隔进行区分
   */
  private Map<Integer, Map<Callable, String>> tasks = new ConcurrentHashMap();
  /**
   * 在线程池内正在执行的任务  以时间间隔进行区分
   */
  private Map<Integer, Map<Future, String>> futures = new ConcurrentHashMap();

  /**
   * 单例
   *
   * @return 实例
   */
  public static final RecurringTaskContainer getInstance() {
    return LazyHolder.INSTANCE;
  }

  /**
   * 向定时任务集合里添加定时任务
   *
   * @param interval 时间间隔
   * @param task     任务
   * @return 返回添加的任务
   */
  public <T> Callable addRecurringCallable(Integer interval, String describe, Callable<T> task) {
    if (interval < 2) {
      throw new RuntimeException("该工具不允许执行3s以下间隔的任务");
    }
    log.info("循环任务池添加任务-{}", describe);
    if (getTasks().containsKey(interval)) {
      getTasks().get(interval).put(task, describe);
    } else {
      Map<Callable, String> map = new ConcurrentHashMap();
      map.put(task, describe);
      getTasks().put(interval, map);
      futures.put(interval, new ConcurrentHashMap());
      startScheduledTask(interval);
    }
    return task;
  }

  /**
   * 向定时任务集合里添加定时任务
   *
   * @param interval 时间间隔
   * @param task     任务
   * @return 返回添加的任务
   */
  public <T> Callable addRecurringTask(Integer interval, String describe, Task4Recurring<T> task) {
    if (interval < 2) {
      throw new RuntimeException("该工具不允许执行3s以下间隔的任务");
    }
    Callable<T> callable = () -> {
      T o;
      try {
        o = task.task();
      } catch (Exception e) {
        log.error("定时任务本身产生异常", e);
        throw e;
      }
      return o;
    };
    return addRecurringCallable(interval, describe, callable);
  }

  /**
   * 移除定时任务
   *
   * @param interval 时间间隔
   * @param task     执行的任务线程
   */
  public void removeTask(Integer interval, Callable task) {
    if (this.getTasks().containsKey(interval)) {
      this.getTasks().get(interval).remove(task);
    }
  }

  private synchronized Map<Integer, Map<Callable, String>> getTasks() {
    return this.tasks;
  }

  /**
   * 移除定时任务
   *
   * @param task 任务线程
   */
  public void removeTask(Callable task) {
    for (Integer key : getTasks().keySet()) {
      if (this.getTasks().get(key).containsKey(task)) {
        log.info("循环任务池移除任务-{}", this.getTasks().get(key).get(task));
        this.getTasks().get(key).remove(task);
        break;
      }
    }
  }

  /**
   * 开启定时任务线程 用于拉起具体的定时任务
   *
   * @param interval 时间间隔
   */
  public void startScheduledTask(Integer interval) {
    scheduledThreadPool.scheduleAtFixedRate(() -> {
      Map<Callable, String> map = getTasks().get(interval);
      futures.put(interval, new ConcurrentHashMap());
      Future future = null;
      for (Callable c : map.keySet()) {
        try {
          //log.info("提交循环任务到执行器-{}",map.get(c));
          future = calculatorThread.submit(c);
          futures.get(interval).put(future, map.get(c));
        } catch (Exception e) {
          if (future != null) {
            futures.get(interval).remove(future);
          }
          if(e instanceof RejectedExecutionException){
            log.error("提交任务过多超过排序队列长度，任务被拒绝");
          }else{
          log.error("执行循环任务时发生异常", e);}
        }
        future = null;
      }
    }, 0, interval, TimeUnit.SECONDS);
    scheduledThreadPool.scheduleAtFixedRate(() -> {
      Map<Future, String> map = futures.get(interval);
      Iterator<Future> iter = map.keySet().iterator();
      Future future;
      while (iter.hasNext()) {
        try {
          future = iter.next();
          if (future.isCancelled()) {
            log.info("清除已取消的任务{}，删除该任务", map.get(future));
            map.remove(future);
            continue;
          }
          if (!future.isDone()) {
            log.info("发现未完成的超时任务{}，取消该任务", map.get(future));
            future.cancel(false);
          }
        } catch (Exception e) {
          e.printStackTrace();
        }
      }
    }, interval - 1, interval, TimeUnit.SECONDS);
  }

  /**
   * 它的意义就是为了打印一下日志 就是为了包裹一层try catch
   *
   * @param <T>
   */
  public interface Task4Recurring<T> {
    T task() throws Exception;
  }

  /**
   * 为了单例的内部类
   *
   * @return 实例
   */
  private static class LazyHolder {
    private static final RecurringTaskContainer INSTANCE = new RecurringTaskContainer();
  }
}
